跳到主要内容

Java 并发编程-线程池 ThreadPoolExecutor

前置知识

首先明确,线程池不是越大越好,一般是根据任务的性质来判断是否扩大线程池的

IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2

CPU 密集型任务(大量复杂的运算):应当分配较少的线程,比如 CPU 个数相当的大小。

实际上 Java有自带的线程池工具,Java里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService

JAVA中创建线程池主要有两类方法,一类是通过 Executors 工厂类提供的方法,该类提供了4种不同的线程池可供使用。另一类是通过 ThreadPoolExecutor 类进行自定义创建。

首先来看一下 ThreadPoolExecutor 的UML类图,了解下ThreadPoolExecutor的继承关系。

ThreadPoolExecutor 实现的顶层接口是 Executor

顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。

ExecutorService 接口增加了一些能力:

  1. 扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法;
  2. 提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor 的原理

ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:

1、直接申请线程执行该任务; 2、缓冲到队列中等待线程执行; 3、拒绝该任务。

线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

execute 方法的执行

再来分析一下 execute 方法。线程池使用 executor.execute(worker) 来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}

通过下图可以更好的对上面这 3 步做一个展示

线程池的状态

线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

ThreadPoolExecutor 类中定义了一个 volatile int 变量 runState 来表示线程池的状态,这些状态和线程池密切相关

这 5种状态如下所示

  • RUNNING 运行状态,指可以接受任务执行队列里的任务
  • SHUTDOWN 指调用了 shutdown() 方法,不再接受新任务了,但是队列里的任务得执行完毕。
  • STOP 指调用了 shutdownNow() 方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务。
  • TIDYING 所有任务都执行完毕,在调用 shutdown()/shutdownNow() 中都会尝试更新为这个状态。
  • TERMINATED 终止状态,当执行 terminated() 后会更新为这个状态(也是内部方法,一般外部是通过 isTerminated() 方法判别是否到了这个状态)。

其生命周期转换如下入所示:

5cd1d2aa81655.jpg

ThreadPoolExecutor 的构造方法

这个 ThreadPoolExecutor 自带的四个构造方法

public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

ThreadPoolExecutor 的重要参数

corePoolSize:核心池的大小,就是设置初始线程池的线程数量,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中,默认情况下的线程池是没有线程的,只有任务来之后才会创建线程

核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;该值等于核心线程数量 + 非核心线程数量。

keepAliveTime:非核心线程闲置超时时长。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize;

unit:参数 keepAliveTime 的时间单位(TimeUnit 枚举)

  • NANOSECONDS : 1微毫秒 = 1微秒 / 1000
  • MICROSECONDS : 1微秒 = 1毫秒 / 1000
  • MILLISECONDS : 1毫秒 = 1秒 /1000
  • SECONDS : 秒
  • MINUTES : 分
  • HOURS : 小时
  • DAYS : 天

workQueue:阻塞队列,维护着 等待执行的 Runnable 任务对象

常用的几个阻塞队列(BlockingQueue 接口):

  1. LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是 Integer.MAX_VALUE,也可以指定大小。
  2. ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
  3. SynchronousQueue:同步队列,内部容量为0,每个 put 操作必须等待一个 take 操作,反之亦然。
  4. DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。

threadFactory:执行程序创建新线程时要使用的工厂

handler:饱和策略。关于饱和策略下面单独介绍一下。

ThreadPoolExecutor 饱和策略

ThreadPoolExecutor 饱和策略(或者说是:拒绝处理策略)定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略(ThreadPoolExecutor下面的静态类),四种拒绝处理的策略为 :

  • ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

一个简单的线程池 Demo

首先创建一个 Runnable 接口的实现类

import java.util.Date;

public class MyRunnable implements Runnable {

private String command;

public MyRunnable(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize:最大线程数 10
  3. keepAliveTime: 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy。

关闭线程池

参考资料 如何优雅的使用和理解线程池

前面开辟的线程池,就算是任务完成时也不会自动关闭,需要手动关闭线程。一般使用下面两种方法

  • shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

注意,这里的 shutdown() 方法就像异步任务一样,它不是一直等待关闭的,而是发送一个信号给线程池,让它在执行完成后自己关闭。所以可以在这个 shutdown() 后面加上一些判断任务是否执行完成(例如需要使用到 await 的场景)

long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new Job());
}

pool.shutdown();

// 例如这里加个 while 循环来等待任务执行完
while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行。。。");
}

long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));

线程池获取返回结果

Java 多线程编程中,经常使用的 Thread 的 Runnable() 虽然被经常使用,但其有一个弊端,就是因为无法直接获取该线程的返回值,因为 Runnable 内的 run 方法,被定义为 void 类型,如果开发者需要在线程中处理耗时操作并获得结果,那么必须自己实现一套结果获取的途径,这其实增加了开发者的代码工作量,也可能会因为对线程的不熟悉,造成不必要的代码错误(线程的同步等等问题)。

可是,绝大多情况跑完 Java 的线程 run 后,并不是让它啥都不干的 void,而是希望获得运行结果。因此,从 Java 5开始,Java在语言层级增加了支持线程返回结果的 Future、Callable,用以支持和解决上述问题,完事线程编程模型。

public class MyExecutorService {

private final int NUMBER = 3;

public MyExecutorService() {

// 创建容量为NUMBER的线程池。
ExecutorService pool = Executors.newFixedThreadPool(NUMBER);
// 创建一个 Future 队列
ArrayList<Future<String>> futures = new ArrayList<Future<String>>();

for (int i = 0; i < 10; i++) {

AThread t = new AThread(i);

Future<String> f = pool.submit(t);
futures.add(f);
}

System.out.println("获取结果中...");
for (Future<String> f : futures) {
try {
// if(f.isDone())
System.out.println(f.get());
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("得到结果.");

// 关闭线程池。
pool.shutdown();
}

private class AThread implements Callable<String> {

private int id;

public AThread(int id) {
this.id = id;
}

@Override
public String call() {
System.out.println("线程:" + id + " -> 运行...");

try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("线程:" + id + " -> 结束.");

return "返回的字符串" + id;
}
}

public static void main(String[] args) {
new MyExecutorService();
}
}

FixedRate 和 FixedDelay区别

FixedRate 是指任务总是以固定时间间隔触发,不管任务执行多长时间

而 FixedDelay 是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

因此,使用 ScheduledThreadPool 时,我们要根据需要选择执行一次、FixedRate 执行还是 FixedDelay 执行。

Reference

参考资料 线程池原理分析 参考资料 JAVA自带的4种线程池 参考资料 第十二章 线程池原理 参考资料 Java线程池实现原理及其在美团业务中的实践